{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "6x1ypzczQCwy"
   },
   "source": [
    "# Simple TFX Pipeline for Vertex Pipelines\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "_VuwrlnvQJ5k"
   },
   "source": [
    "This notebook-based tutorial will create a simple TFX pipeline and run it using\n",
    "Google Cloud Vertex Pipelines.  This notebook is based on the TFX pipeline\n",
    "built in\n",
    "[Simple TFX Pipeline Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple).\n",
    "\n",
    "Google Cloud Vertex Pipelines helps you to automate, monitor, and govern\n",
    "your ML systems by orchestrating your ML workflow in a serverless manner. You\n",
    "can define your ML pipelines using Python with TFX, and then execute your\n",
    "pipelines on Google Cloud. See\n",
    "[Vertex Pipelines introduction](https://cloud.google.com/vertex-ai/docs/pipelines/introduction)\n",
    "to learn more about Vertex Pipelines."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "fwZ0aXisoBFW"
   },
   "source": [
    "## Setup\n",
    "### Install python packages"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "WC9W_S-bONgl"
   },
   "source": [
    "We will install required Python packages including TFX and KFP to author ML\n",
    "pipelines and submit jobs to Vertex Pipelines."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "iyQtljP-qPHY",
    "tags": []
   },
   "outputs": [],
   "source": [
    "!pip install --upgrade pip \n",
    "!pip uninstall tensorflow tensorflow-tensorboard tensorflow-io tensorflow-cloud -y \n",
    "!pip install tensorflow==1.15.5 \n",
    "!pip uninstall tfx -y \n",
    "!pip install --upgrade \"tfx[kfp]<2\"\n",
    "\n",
    "# You may see package dependency errors in the output below. You may ignore these to run the cells of this notebook."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "EwT0nov5QO1M"
   },
   "source": [
    "#### Restart the runtime\n",
    "\n",
    "Restart the runtime to ensure the following cells use the updated versions."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "-CRyIL4LVDlQ"
   },
   "source": [
    "You can restart the runtime with following cell:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "KHTSzMygoBF6"
   },
   "outputs": [],
   "source": [
    "# docs_infra: no_execute\n",
    "import sys\n",
    "if not 'google.colab' in sys.modules:\n",
    "  # Automatically restart kernel after installs\n",
    "  import IPython\n",
    "  app = IPython.Application.instance()\n",
    "  app.kernel.do_shutdown(True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "3_SveIKxaENu"
   },
   "source": [
    "Check the package versions."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "Xd-iP9wEaENu",
    "tags": []
   },
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "print('TensorFlow version: {}'.format(tf.__version__))\n",
    "from tfx import v1 as tfx\n",
    "print('TFX version: {}'.format(tfx.__version__))\n",
    "import kfp\n",
    "print('KFP version: {}'.format(kfp.__version__))\n",
    "\n",
    "# You may see a WARNING issued. You can safely ignore this until the TFX and KFP versions are displayed in the cell output."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "aDtLdSkvqPHe"
   },
   "source": [
    "### Set up variables\n",
    "\n",
    "We will set up some variables used to customize the pipelines below. Following\n",
    "information is required:\n",
    "\n",
    "* GCP Project id. You can find your Project ID in the panel with your lab instructions.\n",
    "* GCP Region to run pipelines. For more information about the regions that\n",
    "Vertex Pipelines is available in, see the\n",
    "[Vertex AI locations guide](https://cloud.google.com/vertex-ai/docs/general/locations#feature-availability).\n",
    "* Google Cloud Storage Bucket to store pipeline outputs.\n",
    "\n",
    "**Enter required values in the cell below before running it**.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "EcUseqJaE2XN",
    "tags": []
   },
   "outputs": [],
   "source": [
    "GOOGLE_CLOUD_PROJECT = '[your-project_id]'    \n",
    "GOOGLE_CLOUD_REGION = '[your-lab-region]'     \n",
    "GCS_BUCKET_NAME = GOOGLE_CLOUD_PROJECT + '-gcs'\n",
    "\n",
    "if not (GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_REGION and GCS_BUCKET_NAME):\n",
    "    from absl import logging\n",
    "    logging.error('Please set all required parameters.')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "GAaCPLjgiJrO"
   },
   "source": [
    "Set `gcloud` to use your project."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "VkWdxe4TXRHk",
    "tags": []
   },
   "outputs": [],
   "source": [
    "!gcloud config set project {GOOGLE_CLOUD_PROJECT}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "CPN6UL5CazNy",
    "tags": []
   },
   "outputs": [],
   "source": [
    "PIPELINE_NAME = 'penguin-vertex-pipelines'\n",
    "\n",
    "# Path to various pipeline artifact.\n",
    "PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(\n",
    "    GCS_BUCKET_NAME, PIPELINE_NAME)\n",
    "\n",
    "# Paths for users' Python module.\n",
    "MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(\n",
    "    GCS_BUCKET_NAME, PIPELINE_NAME)\n",
    "\n",
    "# Paths for input data.\n",
    "DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)\n",
    "\n",
    "# This is the path where your model will be pushed for serving.\n",
    "SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)\n",
    "\n",
    "print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))\n",
    "print('MODULE_ROOT: {}'.format(MODULE_ROOT))\n",
    "print('DATA_ROOT: {}'.format(DATA_ROOT))\n",
    "print('SERVING_MODEL_DIR: {}'.format(SERVING_MODEL_DIR))\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "8F2SRwRLSYGa"
   },
   "source": [
    "### Prepare example data\n",
    "The dataset we are using is the\n",
    "[Palmer Penguins dataset](https://allisonhorst.github.io/palmerpenguins/articles/intro.html).\n",
    "\n",
    "There are four numeric features in this dataset:\n",
    "\n",
    "* culmen_length_mm\n",
    "* culmen_depth_mm\n",
    "* flipper_length_mm\n",
    "* body_mass_g\n",
    "\n",
    "All features were already normalized\n",
    "to have range [0,1]. We will build a classification model which predicts the\n",
    "`species` of penguins."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "11J7XiCq6AFP"
   },
   "source": [
    "We need to make our own copy of the dataset. Because TFX ExampleGen reads\n",
    "inputs from a directory, we need to create a directory and copy dataset to it\n",
    "on GCS."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "4fxMs6u86acP",
    "tags": []
   },
   "outputs": [],
   "source": [
"!gcloud storage cp gs://download.tensorflow.org/data/palmer_penguins/penguins_processed.csv {DATA_ROOT}/"   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "ASpoNmxKSQjI"
   },
   "source": [
    "Take a quick look at the CSV file."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "-eSz28UDSnlG",
    "tags": []
   },
   "outputs": [],
   "source": [
    "!gcloud storage cat {DATA_ROOT}/penguins_processed.csv | head"   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You should be able to see five values. `species` is one of 0, 1 or 2, and all other features should have values between 0 and 1."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "nH6gizcpSwWV"
   },
   "source": [
    "## Create a pipeline\n",
    "\n",
    "TFX pipelines are defined using Python APIs. We will define a pipeline which\n",
    "consists of three components:\n",
    "\n",
    "* CsvExampleGen: Reads in data files and convert them to TFX internal format for further processing. There are multiple ExampleGens for various formats. In this tutorial, we will use CsvExampleGen which takes CSV file input.\n",
    "* Trainer: Trains an ML model. Trainer component requires a model definition code from users. You can use TensorFlow APIs to specify how to train a model and save it in a _savedmodel format.\n",
    "* Pusher: Copies the trained model outside of the TFX pipeline. Pusher component can be thought of an deployment process of the trained ML model.\n",
    "\n",
    "Our pipeline will be almost identical to a basic [TFX pipeline](https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple).\n",
    "\n",
    "The only difference is that we don't need to set `metadata_connection_config`\n",
    "which is used to locate\n",
    "[ML Metadata](https://www.tensorflow.org/tfx/guide/mlmd) database. Because\n",
    "Vertex Pipelines uses a managed metadata service, users don't need to care\n",
    "of it, and we don't need to specify the parameter.\n",
    "\n",
    "Before actually define the pipeline, we need to write a model code for the\n",
    "Trainer component first."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "lOjDv93eS5xV"
   },
   "source": [
    "### Write model code.\n",
    "\n",
    "We will create a simple DNN model for classification using TensorFlow Keras API. This model training code will be saved to a separate file.\n",
    "\n",
    "In this tutorial we will use __Generic Trainer__ of TFX which support Keras-based models. You need to write a Python file containing run_fn function, which is the entrypoint for the `Trainer` component."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "aES7Hv5QTDK3",
    "tags": []
   },
   "outputs": [],
   "source": [
    "_trainer_module_file = 'penguin_trainer.py'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "Gnc67uQNTDfW",
    "tags": []
   },
   "outputs": [],
   "source": [
    "%%writefile {_trainer_module_file}\n",
    "\n",
    "# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple\n",
    "\n",
    "from typing import List\n",
    "from absl import logging\n",
    "import tensorflow as tf\n",
    "from tensorflow import keras\n",
    "from tensorflow_transform.tf_metadata import schema_utils\n",
    "\n",
    "\n",
    "from tfx import v1 as tfx\n",
    "from tfx_bsl.public import tfxio\n",
    "\n",
    "from tensorflow_metadata.proto.v0 import schema_pb2\n",
    "\n",
    "_FEATURE_KEYS = [\n",
    "    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'\n",
    "]\n",
    "_LABEL_KEY = 'species'\n",
    "\n",
    "_TRAIN_BATCH_SIZE = 20\n",
    "_EVAL_BATCH_SIZE = 10\n",
    "\n",
    "# Since we're not generating or creating a schema, we will instead create\n",
    "# a feature spec.  Since there are a fairly small number of features this is\n",
    "# manageable for this dataset.\n",
    "_FEATURE_SPEC = {\n",
    "    **{\n",
    "        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)\n",
    "           for feature in _FEATURE_KEYS\n",
    "       },\n",
    "    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)\n",
    "}\n",
    "\n",
    "\n",
    "def _input_fn(file_pattern: List[str],\n",
    "              data_accessor: tfx.components.DataAccessor,\n",
    "              schema: schema_pb2.Schema,\n",
    "              batch_size: int) -> tf.data.Dataset:\n",
    "  \"\"\"Generates features and label for training.\n",
    "\n",
    "  Args:\n",
    "    file_pattern: List of paths or patterns of input tfrecord files.\n",
    "    data_accessor: DataAccessor for converting input to RecordBatch.\n",
    "    schema: schema of the input data.\n",
    "    batch_size: representing the number of consecutive elements of returned\n",
    "      dataset to combine in a single batch\n",
    "\n",
    "  Returns:\n",
    "    A dataset that contains (features, indices) tuple where features is a\n",
    "      dictionary of Tensors, and indices is a single Tensor of label indices.\n",
    "  \"\"\"\n",
    "  return data_accessor.tf_dataset_factory(\n",
    "      file_pattern,\n",
    "      tfxio.TensorFlowDatasetOptions(\n",
    "          batch_size=batch_size, label_key=_LABEL_KEY),\n",
    "      schema=schema).repeat()\n",
    "\n",
    "\n",
    "def _make_keras_model() -> tf.keras.Model:\n",
    "  \"\"\"Creates a DNN Keras model for classifying penguin data.\n",
    "\n",
    "  Returns:\n",
    "    A Keras Model.\n",
    "  \"\"\"\n",
    "  # The model below is built with Functional API, please refer to\n",
    "  # https://www.tensorflow.org/guide/keras/overview for all API options.\n",
    "  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]\n",
    "  d = keras.layers.concatenate(inputs)\n",
    "  for _ in range(2):\n",
    "    d = keras.layers.Dense(8, activation='relu')(d)\n",
    "  outputs = keras.layers.Dense(3)(d)\n",
    "\n",
    "  model = keras.Model(inputs=inputs, outputs=outputs)\n",
    "  model.compile(\n",
    "      optimizer=keras.optimizers.Adam(1e-2),\n",
    "      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),\n",
    "      metrics=[keras.metrics.SparseCategoricalAccuracy()])\n",
    "\n",
    "  model.summary(print_fn=logging.info)\n",
    "  return model\n",
    "\n",
    "\n",
    "# TFX Trainer will call this function.\n",
    "def run_fn(fn_args: tfx.components.FnArgs):\n",
    "  \"\"\"Train the model based on given args.\n",
    "\n",
    "  Args:\n",
    "    fn_args: Holds args used to train the model as name/value pairs.\n",
    "  \"\"\"\n",
    "\n",
    "  # This schema is usually either an output of SchemaGen or a manually-curated\n",
    "  # version provided by pipeline author. A schema can also derived from TFT\n",
    "  # graph if a Transform component is used. In the case when either is missing,\n",
    "  # `schema_from_feature_spec` could be used to generate schema from very simple\n",
    "  # feature_spec, but the schema returned would be very primitive.\n",
    "  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)\n",
    "\n",
    "  train_dataset = _input_fn(\n",
    "      fn_args.train_files,\n",
    "      fn_args.data_accessor,\n",
    "      schema,\n",
    "      batch_size=_TRAIN_BATCH_SIZE)\n",
    "  eval_dataset = _input_fn(\n",
    "      fn_args.eval_files,\n",
    "      fn_args.data_accessor,\n",
    "      schema,\n",
    "      batch_size=_EVAL_BATCH_SIZE)\n",
    "\n",
    "  model = _make_keras_model()\n",
    "  model.fit(\n",
    "      train_dataset,\n",
    "      steps_per_epoch=fn_args.train_steps,\n",
    "      validation_data=eval_dataset,\n",
    "      validation_steps=fn_args.eval_steps)\n",
    "\n",
    "  # The result of the training should be saved in `fn_args.serving_model_dir`\n",
    "  # directory.\n",
    "  model.save(fn_args.serving_model_dir, save_format='tf')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "-LsYx8MpYvPv"
   },
   "source": [
    "Copy the module file to GCS which can be accessed from the pipeline components.\n",
    "Because model training happens on GCP, we need to upload this model definition. \n",
    "\n",
    "Otherwise, you might want to build a container image including the module file\n",
    "and use the image to run the pipeline."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "rMMs5wuNYAbc",
    "tags": []
   },
   "outputs": [],
   "source": [
    "!gcloud storage cp {_trainer_module_file} {MODULE_ROOT}/"   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "w3OkNz3gTLwM"
   },
   "source": [
    "### Write a pipeline definition\n",
    "\n",
    "We will define a function to create a TFX pipeline."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "M49yYVNBTPd4",
    "tags": []
   },
   "outputs": [],
   "source": [
    "# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and\n",
    "# slightly modified because we don't need `metadata_path` argument.\n",
    "\n",
    "def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,\n",
    "                     module_file: str, serving_model_dir: str,\n",
    "                     ) -> tfx.dsl.Pipeline:\n",
    "  \"\"\"Creates a three component penguin pipeline with TFX.\"\"\"\n",
    "  # Brings data into the pipeline.\n",
    "  example_gen = tfx.components.CsvExampleGen(input_base=data_root)\n",
    "\n",
    "  # Uses user-provided Python function that trains a model.\n",
    "  trainer = tfx.components.Trainer(\n",
    "      module_file=module_file,\n",
    "      examples=example_gen.outputs['examples'],\n",
    "      train_args=tfx.proto.TrainArgs(num_steps=100),\n",
    "      eval_args=tfx.proto.EvalArgs(num_steps=5))\n",
    "\n",
    "  # Pushes the model to a filesystem destination.\n",
    "  pusher = tfx.components.Pusher(\n",
    "      model=trainer.outputs['model'],\n",
    "      push_destination=tfx.proto.PushDestination(\n",
    "          filesystem=tfx.proto.PushDestination.Filesystem(\n",
    "              base_directory=serving_model_dir)))\n",
    "\n",
    "  # Following three components will be included in the pipeline.\n",
    "  components = [\n",
    "      example_gen,\n",
    "      trainer,\n",
    "      pusher,\n",
    "  ]\n",
    "\n",
    "  return tfx.dsl.Pipeline(\n",
    "      pipeline_name=pipeline_name,\n",
    "      pipeline_root=pipeline_root,\n",
    "      components=components)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "mJbq07THU2GV"
   },
   "source": [
    "## Run the pipeline on Vertex Pipelines.\n",
    "\n",
    "TFX provides multiple orchestrators to run your pipeline. In this tutorial we\n",
    "will use the Vertex Pipelines together with the Kubeflow V2 dag runner."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "7mp0AkmrPdUb"
   },
   "source": [
    "We need to define a runner to actually run the pipeline. You will compile\n",
    "your pipeline into our pipeline definition format using TFX APIs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "fAtfOZTYWJu-",
    "tags": []
   },
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'\n",
    "\n",
    "runner = tfx.orchestration.experimental.KubeflowV2DagRunner(\n",
    "    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),\n",
    "    output_filename=PIPELINE_DEFINITION_FILE)\n",
    "# Following function will write the pipeline definition to PIPELINE_DEFINITION_FILE.\n",
    "_ = runner.run(\n",
    "    _create_pipeline(\n",
    "        pipeline_name=PIPELINE_NAME,\n",
    "        pipeline_root=PIPELINE_ROOT,\n",
    "        data_root=DATA_ROOT,\n",
    "        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),\n",
    "        serving_model_dir=SERVING_MODEL_DIR))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "fWyITYSDd8w4"
   },
   "source": [
    "The generated definition file can be submitted using kfp client."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "id": "tI71jlEvWMV7"
   },
   "outputs": [],
   "source": [
    "# docs_infra: no_execute\n",
    "from google.cloud import aiplatform\n",
    "from google.cloud.aiplatform import pipeline_jobs\n",
    "\n",
    "aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)\n",
    "\n",
    "job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,\n",
    "                                display_name=PIPELINE_NAME)\n",
    "job.run(sync=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "id": "L3k9f5IVQXcQ"
   },
   "source": [
    "Visit __Vertex AI > Pipelines__ in your Google Cloud Console page to see the progress.\n",
    "\n",
    "Click on your `penguin-vertex-pipelines-xxx` run:\n",
    "\n",
    "![pipeline_start](01_pipeline_start.png)\n",
    "\n",
    "Explore the information displayed in each step while you wait for the job to progress.\n",
    "\n",
    "On completion, your pipeline UI should look similar to this:\n",
    "\n",
    "![pipeline_end](02_pipeline_completed.png)\n",
    "\n",
    "This job will take about 15 minutes in total to complete. Once complete, return to the lab to check your progress."
   ]
  }
 ],
 "metadata": {
  "colab": {
   "collapsed_sections": [
    "pknVo1kM2wI2"
   ],
   "name": "Simple TFX Pipeline for Vertex Pipelines",
   "provenance": [],
   "toc_visible": true
  },
  "environment": {
   "kernel": "python3",
   "name": "tf2-cpu.2-7.m87",
   "type": "gcloud",
   "uri": "gcr.io/deeplearning-platform-release/tf2-cpu.2-7:m87"
  },
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
